package opmq

import (
	"context"
	"gitee.com/maomaomaoge/opmq/grpc_proto"
)

type OpmqRpc struct {
	grpc_proto.UnimplementedOpmqRpcServer
}

func (OpmqRpc) Pull(ctx context.Context, null *grpc_proto.Null) (*grpc_proto.PullData, error) {
	res := &grpc_proto.PullData{
		Data: make([]*grpc_proto.ClusterNode, 0),
	}

	if !Broker.ClusterSelf.IsMaster {
		return res, nil
	}

	for _, v := range Broker.ClusterSelf.Nodes {
		DEBUG.Println("pull数据: ", v)
		res.Data = append(res.Data, &grpc_proto.ClusterNode{
			Id:    v.Id,
			Ip:    v.Ip,
			Port:  v.Port, // 注意这个端口是本身的mqtt端口
			Topic: v.Topics,
		})
	}

	return res, nil
}

// Push 丛节点发送数据到主节点
func (OpmqRpc) Push(ctx context.Context, node *grpc_proto.ClusterNode) (*grpc_proto.Error, error) {
	res := &grpc_proto.Error{}

	Broker.ClusterSelf.NodeJoin(node.Ip, node.Port, node.Id, node.Topic)

	res.Message = "ok"
	return res, nil
}
